 1.Flink常用API详解之Flink DataSet常用API
   
   1).DataSource
   对DataSet批处理而言，较为频繁的操作是读取HDFS中的文件数据，因为这里主要介绍两个
   DataSource组件
        基于集合
        fromCollection(Collection)，主要是为了方便测试使用
        基于文件
        readTextFile(path)，基于HDFS中的数据进行计算分析
   2).Transformation
   Transformation      Description
   Map                 在算子中得到一个元素并生成一个新元素data.map { x => x.toInt }
   FlatMap             在算子中获取一个元素, 并生成任意个数的元素
                       data.flatMap { str =>str.split(" ") }
   MapPartition        类似Map, 但是一次Map一整个并行分区
                       data.mapPartition { in => inmap { (_, 1) } }
   Filter              如果算子返回true则包含进数据集, 如果
                       不是则被过滤掉data.filter { _ >100 }
   Reduce              通过将两个元素合并为一个元素, 从而将一组
                       元素合并为一个元素data.reduce { _ + _ }
   ReduceGroup         将一组元素合并为一个或者多个元素data.reduceGroup 
                       { elements =>elements.sum }
   Aggregate           讲一组值聚合为一个值, 聚合函数可以看作是内置的Reduce函数
                       data.aggregate(SUM,0).aggregate(MIN,2)data.sum(0).min(2)
   Distinct            去重
   Join                按照相同的Key合并两个数据集
                       input1.join(input2).where(0).equalTo(1)同时也可以选择进行合并的时
                       候的策略,是分区还是广播,是基于排序的算法还是基于哈希的算法
                       input1.join(input2,JoinHint.BROADCAST_HASH_FIRST)
                       .where(0).equalTo(1)
   OuterJoin           外连接, 包括左外, 右外, 完全外连接等
                       left.leftOuterJoin(right).where(0).equalTo(1) { (left, right) => ... }
   CoGroup             二维变量的Reduce运算, 对每个输入数据集中的字段进行分组,然后
                       join这些组input1.coGroup(input2).where(0).equalTo(1)
   Cross               笛卡尔积input1.cross(input2)
   Union               并集input1.union(input2)
   Rebalance           分区重新平衡, 以消除数据倾斜input.rebalance()
   Hash-Partition      按照Hash分区input.partitionByHash(0)
   Range-Partition     按照Range分区input.partitionByRange(0)
   CustomParititioning 自定义分区input.partitionCustom(partitioner: Partitioner[K], key)
   First-n             返回数据集中的前n个元素input.first(3)
   partitionByHash     按照指定的key进行hash分区
   sortPartition       指定字段对分区中的数据进行排序
   
   Flink针对DataSet也提供了大量的已经实现的算子，和DataStream计算很类似
   Map：输入一个元素，然后返回一个元素，中间可以进行清洗转换等操作
   FlatMap：输入一个元素，可以返回0个、1个或者多个元素
        Filter：过滤函数，对传入的数据进行判断，符合条件的数据会被留下
		Reduce：对数据进行聚合操作，结合当前元素和上一次Reduce返回的值进行聚合操作，然
后返回一个新的值
        Aggregations：sum()、min()、max()等
   3).Sink
   Flink针对DataStream提供了大量的已经实现的数据目的地（Sink），具体如下所示
   writeAsText()：将元素以字符串形式逐行写入，这些字符串通过调用每个元素的toString()方法来
获取
   writeAsCsv()：将元组以逗号分隔写入文件中，行及字段之间的分隔是可配置的，每个字段的值来
自对象的toString()方法
   print()/pringToErr()：打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
   Flink提供了一批内置的Connector，其中有的Connector会提供对应的Sink支持，如1.1节中表所
示

 2.Flink Table API和SQL_API
   
   Apache Flink提供了两种顶层的关系型API，分别为Table API和SQL，Flink通过Table API&SQL实现了
批流统一。其中Table API是用于Scala和Java的语言集成查询API，它允许以非常直观的方式组合关系运
算符（例如select，where和join）的查询。Flink SQL基于Apache Calcite 实现了标准的SQL，用户可
以使用标准的SQL处理数据集。Table API和SQL与Flink的DataStream和DataSet API紧密集成在一起，
用户可以实现相互转化，比如可以将DataStream或者DataSet注册为table进行操作数据。值得注意的
是，Table API and SQL目前尚未完全完善，还在积极的开发中，所以并不是所有的算子操作都可以通
过其实现。
   
   依赖：
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>1.11.1</version>
            <type>pom</type>
            <scope>provided</scope>
        </dependency>
        <!-- Either... -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.11.1</version>
            <scope>provided</scope>
        </dependency>
        <!-- or... -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.11.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.11.1</version>
            <scope>provided</scope>
        </dependency>   
   基于TableAPI的案例：
package com.lagou.table;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class TableApiDemo {
    public static void main(String[] args) throws Exception {
        //Flink执行环境env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //用env，做出Table环境tEnv
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //获取流式数据源
        DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2<>("name", 10));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });
        //将流式数据源做成Table
        Table table = tEnv.fromDataStream(data, $("name"), $("age"));
        //对Table中的数据做查询
        Table name = table.select($("name"));
        //将处理结果输出到控制台
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(name, Row.class);
        result.print();
        env.execute();
    }
}   